package com.cnpc.bds.sdk.sink.kafka.handler.filter;

import com.cnpc.bds.sdk.sink.kafka.handler.AbstractHandler;

import java.util.function.Predicate;

public class ConditionalMsgFilter<T> extends AbstractHandler<T, T> {

    private final Predicate<T> predicate;

    public ConditionalMsgFilter(final Predicate<T> predicate) {
        this.predicate = predicate;
    }

    @Override
    public T handle(final T msg) {
        if (predicate.test(msg)) {
            return msg;
        }
        return null;
    }
}
